package com.young.rocketmq.simpleexample.consumer;

import com.young.common.utils.TimeUtils;
import com.young.rocketmq.config.Const;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Simple Message Example
 * Consumer
 * 批量投递生产者测试消费者
 *
 * @author ：<a href="mailto:youngkun2016@163.com">young</a>
 * @date ：Created in 2020/3/29
 */
public class BatchConsumer {
    private static Logger logger = LogManager.getLogger(BatchConsumer.class);

    public static void main(String[] args) throws MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchProducerGroupName");

        // Specify name server addresses.
        consumer.setNamesrvAddr(Const.NAME_SRV);

        // Subscribe one more more topics to consume.
        consumer.subscribe("BatchTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                logger.info(String.format("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //Launch the consumer instance.
        consumer.start();
        logger.info("Consumer Started.%n");
        // 半小时后退出程序
        TimeUtils.sleep(180000);
        consumer.shutdown();
        return;
    }
}
